Skip to content

WIP: Feature: Add S3 Storage Support for Stream Queues#1176

Draft
viktorerlingsson wants to merge 7 commits intomainfrom
s3-streams
Draft

WIP: Feature: Add S3 Storage Support for Stream Queues#1176
viktorerlingsson wants to merge 7 commits intomainfrom
s3-streams

Conversation

@viktorerlingsson
Copy link
Copy Markdown
Member

@viktorerlingsson viktorerlingsson commented Jun 26, 2025

Work in progress!

This PR introduces S3 storage support for LavinMQ stream queues, enabling efficient long-term storage and reduced local disk usage for stream data. This feature enables LavinMQ to handle very large streams by offloading historical data to S3 while maintaining high performance for active consumers through intelligent local caching.

Throughput is lower than with regular streams, but in best-case-scenarios it should be pretty close.

WHAT is this pull request doing?

Summary

  • New S3 Message Store: Implements StreamS3MessageStore class that extends the existing stream queue functionality
    with S3 backend storage
  • Automatic Upload/Download: Segments are automatically uploaded to S3 when closed and downloaded on-demand when
    needed by consumers
  • Smart Local Caching: Maintains a configurable number of local segments based on active consumer positions to
    optimize performance

Key Features

  • On-demand segment downloading: Only downloads segments that consumers need, reducing local storage requirements
  • Predictive caching: Downloads segments ahead of current consumer positions to ensure smooth streaming
  • Automatic cleanup: Removes local segments when they're no longer needed while keeping them in S3

Dependencies

  • Adds awscr-signer dependency for AWS authentication

How to test

Add relevant config to your config file (or cmd args)
Example config:

[s3-storage]
enabled = true
bucket = my-bucket
region = eu-north-1
access_key_id = foo
secret_access_key = bar
local_segments_per_stream = 20
endpoint = s3.eu-north-1.amazonaws.com

Then just create a stream and start publishing/consuming.

TO-DO's / Known issues

  • Test, test, test!
  • Add specs (Some specs added, could/should be extended)
  • test with other providers
  • upload local meta file to s3 if it does not exist there
  • When to delete local meta files?
  • paging for getting file list for very large streams
  • Should there be a max time to wait before uploading? like upload every x seconds/minutes regardless of size?
  • consumer offsets - should that be uploaded somehow?

Implements #1070

HOW can this pull request be tested?

There are some specs, but not for every scenario. Manual testing is still needed.

Copy link
Copy Markdown
Contributor

@lukas8219 lukas8219 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just small comments since i haven't been able to ran it locally yet

@carlhoerberg
Copy link
Copy Markdown
Member

Found this: https://github.com/ysbaddaden/xml.cr

Copy link
Copy Markdown
Member

@spuun spuun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dunno if it would be better or not, but maybe it's good to have global download_segments and monitor_downloads fibers. Not for a first version of s3 queue thought.

@cloudamqp cloudamqp deleted a comment from claude bot Aug 18, 2025
@cloudamqp cloudamqp deleted a comment from claude bot Aug 18, 2025
@cloudamqp cloudamqp deleted a comment from claude bot Aug 18, 2025
@cloudamqp cloudamqp deleted a comment from claude bot Aug 18, 2025
@viktorerlingsson viktorerlingsson linked an issue Jan 27, 2026 that may be closed by this pull request
@cloudamqp cloudamqp deleted a comment from claude bot Mar 24, 2026
viktorerlingsson and others added 7 commits March 24, 2026 11:10
S3SegmentCache:
- Replace 3 polling fibers with fixed worker pool + coordinator
- Use Channel-based work distribution instead of spawning ad-hoc fibers
- Add bounded retries (MAX_ATTEMPTS_PER_SEGMENT = 3)
- Protect shared state with mutex
- Add graceful shutdown via close method

S3StorageClient:
- Remove infinite recursion in download methods (callers handle retries)
- Fix upload_file_to_s3 to use proper loop instead of recursion
- Add exponential backoff to s3_segments_from_bucket retries
- Better error handling and HTTP status code checks

S3MessageStore:
- Fix HTTP client leaks (close clients after each use)
- Add mutex synchronization for concurrent segment array access
- Add close method to shut down segment cache
- Fix missing return in find_offset_in_segments recursive call

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- S3SegmentCache: Fix DownloadAttempt.copy_with not mutating array
  (record is immutable, need to replace array with mapped values)

- S3StorageClient: Close HTTP client in delete_from_s3 method

- S3MessageStore: Add 30s timeout to next_segment to prevent infinite
  wait if segment cannot be downloaded from S3

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Fix S3 list objects API path (add leading slash)
- Delete segment_msg_count when removing empty segments
- Check segment-specific message count in produce_metadata
- Call reload_logger after parsing all config sources
- Add S3 storage config example to lavinmq.ini

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Refactor S3MessageStore to reuse StreamMessageStore base init instead
  of duplicating segment loading, metadata parsing, and file registration
- Extract segment download into overridable `download_segment` method on
  StreamMessageStore, letting S3 subclass transparently fetch from cache
  or direct download
- Rewrite S3SegmentCache fiber lifecycle: idle shutdown with automatic
  restart on new consumers, channel-based coordination replacing
  busy-wait polling
- Fix segment cache eviction order to remove already-consumed segments
  before speculative prefetch segments
- Fix UInt32 subtraction overflow in prefetch distance calculation
- Reconnect HTTP client on error during S3 bucket listing
- Add specs for pagination, concurrent consumers, segment cache
  prefetching, download failure recovery, and max-length enforcement
@claude
Copy link
Copy Markdown

claude bot commented Mar 26, 2026

PR Review

Bug: wait_for_segment notification loss with concurrent consumers

File: src/lavinmq/amqp/stream/s3_segment_cache.cr:80-101

The @download_complete channel is used as a notification mechanism for wait_for_segment, but Channel delivers each message to exactly one receiver. When multiple consumer fibers call wait_for_segment for different segment IDs concurrently, they steal each other's notifications, leading to 30-second timeouts.

Scenario:

  1. Consumer A calls wait_for_segment(5), consumer B calls wait_for_segment(7) — both block on @download_complete.receive
  2. Worker completes segment 5, sends 5 to @download_complete
  3. Consumer B receives 5, checks seg_id == completed_id (7 == 5 → false), loops back to receive
  4. Worker completes segment 7, sends 7 to @download_complete
  5. Consumer A receives 7, checks (5 == 7 → false), loops back to receive
  6. No more notifications are sent — both consumers eventually timeout after 30 seconds

This is worsened by the non-blocking send on line 125-128:

select
when @download_complete.send(seg_id)
else
end

If the channel buffer (256) is full, notifications are silently dropped.

Suggested fix: Replace the single shared channel with a per-segment or broadcast-capable notification mechanism. For example, use a Hash(UInt32, Array(Channel(Nil))) where each waiter registers its own channel for the specific segment it needs, or add a periodic poll of @segments[seg_id]? as a fallback inside the wait loop (e.g., check every 100ms between channel receives).


Generated with Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Blob (s3) storage for (stream) queues

4 participants